package kafka;

import Util.Const;
import Util.Topic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;

/**
 * 枚举单例
 * Created by hy on 2017/01/17 0017.
 */
public enum MyKafkaUtil {

    INSTANCE {
        KafkaConsumer<String, String> consumer = null;

        @Override
        protected KafkaConsumer<String, String> getInstance() {
            if (consumer == null)
                consumer = new KafkaConsumer<>(initProperties());
            return consumer;
        }
    };

    protected abstract KafkaConsumer<String, String> getInstance();    //单例需要进行操作（也可以不写成抽象方法）

    private static Properties _properties;


    private static Properties initProperties() {
        if (_properties == null) {
            _properties = new Properties();
            _properties.put("bootstrap.servers", Const.BROKER_HOST);
            _properties.put("key.deserializer", StringDeserializer.class);
            _properties.put("value.deserializer", StringDeserializer.class);
            _properties.put("group.id", Topic.USER_BEHAVIOR_LOG + "_group");
            _properties.put("auto.commit.interval.ms", 1000);
            _properties.put("enable.auto.commit", true);
        }
        return _properties;
    }

    public static Map<String, Object> GetKafkaParams(String brokers, String groupId) {
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", groupId);
        kafkaParams.put("enable.auto.commit", true);
        return kafkaParams;
    }

    public static Map<TopicPartition, Long> GetConsumerOffset(TopicPartition topic, Map<TopicPartition, Long> consumerDBOffset) {

        Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        KafkaConsumer<String, String> consumer = INSTANCE.getInstance();

        Collection<TopicPartition> topics = new ArrayList<TopicPartition>();
        topics.add(topic);
        Map<TopicPartition, Long> minOffset = consumer.beginningOffsets(topics);
        Map<TopicPartition, Long> maxOffset = consumer.endOffsets(topics);
        if (consumerDBOffset == null) {
            offsets.put(topic, minOffset.get(topic));
        } else {
            for (Map.Entry<TopicPartition, Long> dbOffset : consumerDBOffset.entrySet()) {
                //如果mongodb中记录的offset在kafka中不存在(已经过期或者topic被删除),就指定其现有kafka的最小offset位置开始消费
                if (dbOffset.getValue() < minOffset.get(dbOffset.getKey()) || dbOffset.getValue() > maxOffset.get(dbOffset.getKey())) {
                    offsets.put(dbOffset.getKey(), minOffset.get(dbOffset.getKey()));
                } else {
                    offsets.put(dbOffset.getKey(), dbOffset.getValue());
                }
            }
        }

        return offsets;
    }
}
